[Fix] Issue when using FlyteFile with Elastic #3313
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Tracking issue
Why are the changes needed?
Related to the discussion in the slack channel, when using
FlyteFilewithElastic, the file cannot be downloaded and we will get warning:.venv/lib/python3.12/site-packages/flytekit/types/file/file.py:356: RuntimeWarning: coroutine 'FileAccessProvider.async_get_data' was never awaitedIt seems like the
FlyteFiledoes not play well with underlying multiprocessing spawnBased on my experiment, when we create the
_downloaderpartial function, we use thecontext in main process:
flytekit/flytekit/types/file/file.py
Lines 321 to 326 in eb5a67f
But when we use
Elasticand run theFlyteFilewithin each process via spawn, the memoryis not shared, but our
_downloaderfunction still use main process's context, whichcannot be found in the sub-process. While the context required to convert the coroutine to a
sync call (
loop_manager.synced) isn’t present, the coroutine is returned as-isBelow is the script for testing out the async in spawn with different context. The "Test
2" is which causing problem, and we can find that the things returned from the partial
function is the
coroutine(Partial call is coroutine: {is_coro2}"isTrue), which prove that somehow theloop_manager.synceddoesn't work here.https://github.com/machichima/flytekit_test_scripts/blob/main/elastic-async-fsspec/compare_direct_vs_partial.py
What changes were proposed in this pull request?
Ensure that everytime we call
download, we will get thecurrent_contextof the current process, to ensure calling theget_datasynchronously.How was this patch tested?
Test with following script:
Setup process
Screenshots
Result
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This pull request enhances the FlyteFile class for improved file downloading by implementing a new downloader class. It resolves coroutine warnings in multiprocessing scenarios by ensuring the current process context is used for synchronous calls, thereby increasing reliability and functionality.